#!/usr/local/bin/python3
# _*_ coding: utf-8 _*_
# 持久化

import pymysql
import Config
import time
import json
from kafka import KafkaProducer
import redis


class PersistenceMysql():
    def __init__(self):
        self.host = Config.PERSISTENCE["mysql"]["host"]
        self.username = Config.PERSISTENCE["mysql"]["user"]
        self.password = Config.PERSISTENCE["mysql"]["password"]
        self.database = Config.PERSISTENCE["mysql"]["database"]
        self.table_name = Config.PERSISTENCE["mysql"]["table_name"]
        self.conn = pymysql.connect(host=self.host, user=self.username,
                                    password=self.password, database=self.database, charset="utf8")
        self.insert_sql = "insert into " + self.table_name + \
            "(create_time,value) values(%d,%s)"
        self.block_sql = ""

    @staticmethod
    def pingpingping(self):
        # todo: 检查表是否存在，不存在，则新建
        pass

    def item_serialize(self, item):
        return json.dumps(item)

    def get_current_time(self):
        return int(time.time())

    # 保存感染者到访小区信息
    def save_infected_blocks(self, item):
        sql = "insert into inflected_block(id, p_name, c_name, county_name, block_name, source_url, city_lng, city_lat, block_lng, block_lat) values({},'{}','{}','{}','{}','{}',{},{},{},{})".format(item.get("id"),item.get("province"),item.get("city"),item.get("district"),item.get("community"),item.get("sourceurl"),item.get("citylong"),item.get("citylat"),item.get("communitylong"),item.get("communitylat"))
        cursor = self.conn.cursor()
        try:
            cursor.execute(sql)
            self.conn.commit()
        except Exception as e:
            print(e)
            print("插入小区数据 {}:{}:{}:{}:{} 失败".format(item.get("id"),item.get("p_name"),item.get("c_name"),item.get("county_name"),item.get("block_name")))
    
    # 获取表item总数
    def get_infected_block_count(self):
        sql = "select count(*) from inflected_block"
        cursor = self.conn.cursor()
        cursor.execute()
        return cursor.fetchall()[0][0]

class KafkaWork():
    def __init__(self):
        self.topic = Config.PERSISTENCE["kafka"]["topic"]
        self.server = Config.PERSISTENCE["kafka"]["server"]
        self.producer = KafkaProducer(bootstrap_servers=self.server)

    def send(self, key=None, value=None):
        future = self.producer.send(
            self.topic, value=bytes(value, encoding="utf8"))
        try:
            meta = future.get(timeout=10)
            self.on_success(meta)
        except Exception as e:
            print(e)
            print("kafka 通知失败")
            return False

    def on_success(self, meta):
        # print("topic:{} offset: {} 发送成功！".format(meta.topic,meta.offset))
        return True


class PersistenceRedis():
    def __init__(self):
        self.tip_key = "wxopensvr:tip"
        self.country_key = "wxopensvr:country"
        self.city_key = "wxopensvr:city"
        self.tc_total_key = "wxopensvr:tc:total"
        self.tc_add_key = "wxopensvr:tc:add"
        self.tc_day_key = "wxopensvr:tc:day_list"
        self.tc_day_add_key = "wxopensvr:tc:day_add_list"
        self.host = Config.PERSISTENCE["redis"]["host"]
        self.port = Config.PERSISTENCE["redis"]["port"]
        self.index = Config.PERSISTENCE["redis"]["index"]
        self.password = Config.PERSISTENCE["redis"]["password"]
        self.conn = redis.Redis(
            host=self.host, port=self.port, password=self.password, db=self.index)

    # 保存提示
    def save_tips(self, tip):
        self.conn.set(self.tip_key, tip)

    # 保存各个国家的数据
    def save_countries(self, ll):
        pipe = self.conn.pipeline()
        self.conn.delete(self.country_key)
        for i in ll:
            pipe.zadd(self.country_key, {json.dumps(i): float(i.get("confirmedCount")})
        pipe.execute()

    # 保存我国各个城市的数据
    def save_cities(self, ll):
        pipe=self.conn.pipeline()
        pipe.delete(self.city_key)
        for i in ll:
            pipe.zadd(self.city_key, {json.dumps(i): float(i.get("confirmedCount")})
        pipe.execute()

    # 保存腾讯新闻总数
    def save_tc_total(self, item):
        self.conn.set(self.tc_total_key, item)

    # 保存腾讯新闻“较昨日新增”
    def save_tc_add(self, item):
        self.conn.set(self.tc_add_key, item)

    # 保存腾讯新闻“疫情趋势”
    def save_tc_day_list_item(self, ll):
        pipe=self.conn.pipeline()
        pipe.delete(self.tc_day_key)
        for i in ll:
            pipe.zadd(self.tc_day_key, {json.dumps(i): float(
                int("".join(i.get("date").split("."))))})
        pipe.execute()

    # 保存腾讯新闻“疫情趋势新增”
    def save_tc_day_list_add_item(self, ll):
        pipe=self.conn.pipeline()
        pipe.delete(self.tc_day_add_key)
        for i in ll:
            pipe.zadd(self.tc_day_add_key, {json.dumps(
                i): float(int("".join(i.get("date").split("."))))})
        pipe.execute()



if __name__ == "__main__":
    k=KafkaWork()
    k.send("央视新闻", json.dumps({"name": "标题", "content": "内容"}))
    k.send("央视新闻", json.dumps({"name": "title", "content": "content"}))
